package org.com.gr.capserver.config.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.com.gr.capserver.config.mqtt.mqttCallBack.AbsMqttCallBack;
import org.com.gr.capserver.config.mqtt.mqttCallBack.MqttCallBackContext;
import org.com.gr.capserver.config.netty.BootNettyClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * ClassName: MqttClientManager
 * Description:
 *
 * @author binbin_hao
 * @date 2023/9/8 10:42
 */
@Slf4j
@Component
public class MqttClientManager {
    public static Map<String, MqttClient> mqttClients=new ConcurrentHashMap<>();

    @Autowired
    MqttCallBackContext mqttCallBackContext;

    public static MqttClient getMqttClientById(String clientName) {
        return mqttClients.get(clientName);
    }
    public void createMqttClient(MqttClientProperties mqttClientProperties){
        MemoryPersistence persistence=new MemoryPersistence();
        try {
            MqttClient mqttClient=new MqttClient(mqttClientProperties.getUrl(),mqttClientProperties.getClientId(),persistence);
            MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
            mqttConnectOptions.setUserName(mqttClientProperties.getUserName());
            mqttConnectOptions.setPassword(mqttClientProperties.getPassword().toCharArray());
            mqttConnectOptions.setCleanSession(true);
            //设置自动重连
            mqttConnectOptions.setAutomaticReconnect(mqttClientProperties.isCleanSession());
            //断开连接5秒后重新连接
            mqttConnectOptions.setMaxReconnectDelay(5000);
            mqttConnectOptions.setKeepAliveInterval(mqttClientProperties.getKeepalive());
            mqttConnectOptions.setConnectionTimeout(mqttClientProperties.getTimeout());
            AbsMqttCallBack callBack=mqttCallBackContext.getCallBack(mqttClientProperties.getClientName());
            if (callBack==null){
                callBack=mqttCallBackContext.getCallBack("default");
            }
            callBack.setClientName(mqttClientProperties.getClientName());
            callBack.setConnectOptions(mqttConnectOptions);
            callBack.setTopic(mqttClientProperties.getSubscribeTopic());
            mqttClient.setCallback(callBack);
            mqttClient.connect(mqttConnectOptions);
            log.info("MQTT客户端：{}链接成功，订阅主题：{}",mqttClientProperties.getClientName(),mqttClientProperties.getSubscribeTopic());
            mqttClient.subscribe(mqttClientProperties.getSubscribeTopic(),mqttClientProperties.getDefaultQos());
            //建立链接后，放到map中
            mqttClients.putIfAbsent(mqttClientProperties.getClientName(),mqttClient);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
